/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "orc/Exceptions.hh"
#include "InputStream.hh"

#include <algorithm>
#include <iomanip>

namespace orc {

  void printBuffer(std::ostream& out,
                   const char *buffer,
                   uint64_t length) {
    const uint64_t width = 24;
    out << std::hex;
    for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
      out << std::setfill('0') << std::setw(7) << (line * width);
      for(uint64_t byte = 0;
          byte < width && line * width + byte < length; ++byte) {
        out << " " << std::setfill('0') << std::setw(2)
            << static_cast<uint64_t>(0xff & buffer[line * width +
                                                   byte]);
      }
      out << "\n";
    }
    out << std::dec;
  }

  PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
    position = posns.begin();
  }

  uint64_t PositionProvider::next() {
    uint64_t result = *position;
    ++position;
    return result;
  }

  SeekableInputStream::~SeekableInputStream() {
    // PASS
  }

  SeekableArrayInputStream::~SeekableArrayInputStream() {
    // PASS
  }

  SeekableArrayInputStream::SeekableArrayInputStream
               (const unsigned char* values,
                uint64_t size,
                uint64_t blkSize
               ): data(reinterpret_cast<const char*>(values)) {
    length = size;
    position = 0;
    blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
  }

  SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
                                                     uint64_t size,
                                                     uint64_t blkSize
  ): data(values) {
    length = size;
    position = 0;
    blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
  }

  bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
    uint64_t currentSize = std::min(length - position, blockSize);
    if (currentSize > 0) {
      *buffer = data + position;
      *size = static_cast<int>(currentSize);
      position += currentSize;
      return true;
    }
    *size = 0;
    return false;
  }

  void SeekableArrayInputStream::BackUp(int count) {
    if (count >= 0) {
      uint64_t unsignedCount = static_cast<uint64_t>(count);
      if (unsignedCount <= blockSize && unsignedCount <= position) {
        position -= unsignedCount;
      } else {
        throw std::logic_error("Can't backup that much!");
      }
    }
  }

  bool SeekableArrayInputStream::Skip(int count) {
    if (count >= 0) {
      uint64_t unsignedCount = static_cast<uint64_t>(count);
      if (unsignedCount + position <= length) {
        position += unsignedCount;
        return true;
      } else {
        position = length;
      }
    }
    return false;
  }

  google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
    return static_cast<google::protobuf::int64>(position);
  }

  void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
    position = seekPosition.next();
  }

  std::string SeekableArrayInputStream::getName() const {
    std::ostringstream result;
    result << "SeekableArrayInputStream " << position << " of " << length;
    return result.str();
  }

  static uint64_t computeBlock(uint64_t request, uint64_t length) {
    return std::min(length, request == 0 ? 256 * 1024 : request);
  }

  SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
                                                   uint64_t offset,
                                                   uint64_t byteCount,
                                                   MemoryPool& _pool,
                                                   uint64_t _blockSize
                                                   ):pool(_pool),
                                                     input(stream),
                                                     start(offset),
                                                     length(byteCount),
                                                     blockSize(computeBlock
                                                               (_blockSize,
                                                                length)) {

    position = 0;
    buffer.reset(new DataBuffer<char>(pool));
    pushBack = 0;
  }

  SeekableFileInputStream::~SeekableFileInputStream() {
    // PASS
  }

  bool SeekableFileInputStream::Next(const void** data, int*size) {
    uint64_t bytesRead;
    if (pushBack != 0) {
      *data = buffer->data() + (buffer->size() - pushBack);
      bytesRead = pushBack;
    } else {
      bytesRead = std::min(length - position, blockSize);
      buffer->resize(bytesRead);
      if (bytesRead > 0) {
        input->read(buffer->data(), bytesRead, start+position);
        *data = static_cast<void*>(buffer->data());
      }
    }
    position += bytesRead;
    pushBack = 0;
    *size = static_cast<int>(bytesRead);
    return bytesRead != 0;
  }

  void SeekableFileInputStream::BackUp(int signedCount) {
    if (signedCount < 0) {
      throw std::logic_error("can't backup negative distances");
    }
    uint64_t count = static_cast<uint64_t>(signedCount);
    if (pushBack > 0) {
      throw std::logic_error("can't backup unless we just called Next");
    }
    if (count > blockSize || count > position) {
      throw std::logic_error("can't backup that far");
    }
    pushBack = static_cast<uint64_t>(count);
    position -= pushBack;
  }

  bool SeekableFileInputStream::Skip(int signedCount) {
    if (signedCount < 0) {
      return false;
    }
    uint64_t count = static_cast<uint64_t>(signedCount);
    position = std::min(position + count, length);
    pushBack = 0;
    return position < length;
  }

  int64_t SeekableFileInputStream::ByteCount() const {
    return static_cast<int64_t>(position);
  }

  void SeekableFileInputStream::seek(PositionProvider& location) {
    position = location.next();
    if (position > length) {
      position = length;
      throw std::logic_error("seek too far");
    }
    pushBack = 0;
  }

  std::string SeekableFileInputStream::getName() const {
    std::ostringstream result;
    result << input->getName() << " from " << start << " for "
           << length;
    return result.str();
  }

}
